{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**_pySpark Basics: Resampling_**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_by Jeff Levy (jlevy@urban.org)_\n",
    "\n",
    "_Last Updated: 31 Jul 2017, Spark v2.1_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_Abstract: This guide will demonstrate changing the frequency of observations by aggregating daily data into monthly._\n",
    "\n",
    "_Main operations used: `dtypes`, `udf`, `drop`, `groupBy`, `agg`, `withColumn`, `dateFormat`, `select`_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "***"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We begin by creating a simple dataset, where we first define a row as having three fields (columns) and then define each individual row by specifying its three entries:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import datetime\n",
    "from pyspark.sql import Row\n",
    "from pyspark.sql.functions import col\n",
    "\n",
    "row = Row(\"date\", \"name\", \"production\")\n",
    "\n",
    "df = sc.parallelize([\n",
    "    row(\"08/01/2014\", \"Kim\", 5),\n",
    "    row(\"08/02/2014\", \"Kim\", 14),\n",
    "    row(\"08/01/2014\", \"Bob\", 6),\n",
    "    row(\"08/02/2014\", \"Bob\", 3),\n",
    "    row(\"08/01/2014\", \"Sue\", 0),\n",
    "    row(\"08/02/2014\", \"Sue\", 22),\n",
    "    row(\"08/01/2014\", \"Dan\", 4),\n",
    "    row(\"08/02/2014\", \"Dan\", 4),\n",
    "    row(\"08/01/2014\", \"Joe\", 37),\n",
    "    row(\"09/01/2014\", \"Kim\", 6),\n",
    "    row(\"09/02/2014\", \"Kim\", 6),\n",
    "    row(\"09/01/2014\", \"Bob\", 4),\n",
    "    row(\"09/02/2014\", \"Bob\", 20),\n",
    "    row(\"09/01/2014\", \"Sue\", 11),\n",
    "    row(\"09/02/2014\", \"Sue\", 2),\n",
    "    row(\"09/01/2014\", \"Dan\", 1),\n",
    "    row(\"09/02/2014\", \"Dan\", 3),\n",
    "    row(\"09/02/2014\", \"Joe\", 29)\n",
    "    ]).toDF()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+----+----------+\n",
      "|      date|name|production|\n",
      "+----------+----+----------+\n",
      "|08/01/2014| Kim|         5|\n",
      "|08/02/2014| Kim|        14|\n",
      "|08/01/2014| Bob|         6|\n",
      "|08/02/2014| Bob|         3|\n",
      "|08/01/2014| Sue|         0|\n",
      "|08/02/2014| Sue|        22|\n",
      "|08/01/2014| Dan|         4|\n",
      "|08/02/2014| Dan|         4|\n",
      "|08/01/2014| Joe|        37|\n",
      "|09/01/2014| Kim|         6|\n",
      "|09/02/2014| Kim|         6|\n",
      "|09/01/2014| Bob|         4|\n",
      "|09/02/2014| Bob|        20|\n",
      "|09/01/2014| Sue|        11|\n",
      "|09/02/2014| Sue|         2|\n",
      "|09/01/2014| Dan|         1|\n",
      "|09/02/2014| Dan|         3|\n",
      "|09/02/2014| Joe|        29|\n",
      "+----------+----+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('date', 'string'), ('name', 'string'), ('production', 'bigint')]"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.dtypes"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "While we have dates for each observation, you can see they are just string objects.  Defaulting to strings is quite common in pySpark dataframes, and while we can convert them to date objects using the standard Python datetime module (demonstrated below), it is often not necessary.  Whether it is worth the conversion likely depends on what other timeseries functions you plan on working with.  As an example, let's resample this data to find monthly production for each individual.\n",
    "\n",
    "First we create a new column that contains just the month and year.  This isn't quite as elegant in pySpark as it is for smaller, non-distributed data done in Pandas, but I'll comment each step carefully as we go:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "#'udf' stands for 'user defined function', and is simply a wrapper for functions you write and \n",
    "#want to apply to a column that knows how to iterate through pySpark dataframe columns. it should\n",
    "#be more clear after we use it below\n",
    "from pyspark.sql.functions import udf\n",
    "\n",
    "#we define our own function that knows how to split apart a MM/DD/YYYY string and return a \n",
    "#MM/YYYY string.  everything in here is standard Python, and not specific to pySpark\n",
    "def split_date(whole_date):\n",
    "    \n",
    "    #this try-except handler provides some minimal fault tolerance in case one of our date \n",
    "    #strings is malformed, as we might find with real-world data. if it fails to split the\n",
    "    #date into three parts it just returns 'error', which we could later subset the data on\n",
    "    #to see what went wrong\n",
    "    try:\n",
    "        mo, day, yr = whole_date.split('/')\n",
    "    except ValueError:\n",
    "        return 'error'\n",
    "    \n",
    "    #lastly we return the month and year strings joined together\n",
    "    return mo + '/' + yr\n",
    "\n",
    "#this is where we wrap the function we wrote above in the udf wrapper\n",
    "udf_split_date = udf(split_date)\n",
    "\n",
    "#here we create a new dataframe by calling the original dataframe and specifying the new\n",
    "#column.  unlike with Pandas or R, pySpark dataframes are immutable, so we cannot simply assign\n",
    "#to a new column on the original dataframe\n",
    "df_new = df.withColumn('month_year', udf_split_date('date'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that we could easily use our `split_date` function above to use datetime objects.  This could be useful if we wanted to resample our data to, say, quarterly or weekly, both of which datetime objects (https://docs.python.org/2/library/datetime.html) can easily keep track of for us.  In the case of a monthly split, we would gain nothing from the extra operation.\n",
    "\n",
    "Below we see the results in our new dataframe, then we drop the original date column:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+----+----------+----------+\n",
      "|      date|name|production|month_year|\n",
      "+----------+----+----------+----------+\n",
      "|08/01/2014| Kim|         5|   08/2014|\n",
      "|08/02/2014| Kim|        14|   08/2014|\n",
      "|08/01/2014| Bob|         6|   08/2014|\n",
      "|08/02/2014| Bob|         3|   08/2014|\n",
      "|08/01/2014| Sue|         0|   08/2014|\n",
      "|08/02/2014| Sue|        22|   08/2014|\n",
      "|08/01/2014| Dan|         4|   08/2014|\n",
      "|08/02/2014| Dan|         4|   08/2014|\n",
      "|08/01/2014| Joe|        37|   08/2014|\n",
      "|09/01/2014| Kim|         6|   09/2014|\n",
      "|09/02/2014| Kim|         6|   09/2014|\n",
      "|09/01/2014| Bob|         4|   09/2014|\n",
      "|09/02/2014| Bob|        20|   09/2014|\n",
      "|09/01/2014| Sue|        11|   09/2014|\n",
      "|09/02/2014| Sue|         2|   09/2014|\n",
      "|09/01/2014| Dan|         1|   09/2014|\n",
      "|09/02/2014| Dan|         3|   09/2014|\n",
      "|09/02/2014| Joe|        29|   09/2014|\n",
      "+----------+----+----------+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_new.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_new = df_new.drop('date')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we perform two steps on one line.  First we group the data - this can be done along multiple categories if desired.  So if we want to aggregate every employee's data together, leaving us with just values for August and September, we would group by `monthYear` alone.  In this case let's say we want totals for each employee within each month, so we group by `monthYear` and by `name` together.\n",
    "\n",
    "After that we aggregate the resulting grouped dataframe; pySpark automatically knows the operations should be performed within groups only.  We just pass a dictionary into the `.agg` method, with the key being the column name of interest and the value being the operation used to aggregate.  We'll use `sum`, but we can also use, for example, `avg`, `min` or `max`.  Note that this is done by passing the operation as a string."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_agg = df_new.groupBy('month_year', 'name').agg({'production' : 'sum'})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The aggregation can be done on more than one field using different types, just by adding the appropriate entry to the dictionary.  For example, if there was an \"hours worked\" column, we might pass a dictionary that looked like this: `{'production' : 'sum', 'hours' : 'avg'}`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+----+---------------+\n",
      "|month_year|name|sum(production)|\n",
      "+----------+----+---------------+\n",
      "|   09/2014| Sue|             13|\n",
      "|   09/2014| Kim|             12|\n",
      "|   09/2014| Bob|             24|\n",
      "|   09/2014| Joe|             29|\n",
      "|   09/2014| Dan|              4|\n",
      "|   08/2014| Kim|             19|\n",
      "|   08/2014| Joe|             37|\n",
      "|   08/2014| Dan|              8|\n",
      "|   08/2014| Sue|             22|\n",
      "|   08/2014| Bob|              9|\n",
      "+----------+----+---------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_agg.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "If you definitely want datetime objects in your dataframe (Spark currently has very limited timeseries functionality), you can accomplish it with another `udf`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import udf\n",
    "from pyspark.sql.types import DateType\n",
    "from datetime import datetime\n",
    "\n",
    "dateFormat = udf(lambda x: datetime.strptime(x, '%M/%d/%Y'), DateType())\n",
    "    \n",
    "df_d = df.withColumn('new_date', dateFormat(col('date')))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('date', 'string'),\n",
       " ('name', 'string'),\n",
       " ('production', 'bigint'),\n",
       " ('new_date', 'date')]"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_d.dtypes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(new_date=datetime.date(2014, 1, 1))]"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_d.select('new_date').take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "In this case we take advantage of the `strptime` feature of the standard Python datetime module, which takes a string and a format string and returns a datetime object.  Datetime objects can be far more useful than a date as a string if you plan a lot of other timeseries operations; they allow things like subtracting two dates to get elapsed time, or separating by quarters or weeks, or accounting for time zones or leap years.\n",
    "\n",
    "Better time series functionality appears to be a priority in Spark development, and multiple options have already been proposed that would make their use far more effecient.  Expect to see future versions taking better advantage of this."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
